<?php

namespace App\Http\Controller\Test;

use Swoft\Http\Server\Annotation\Mapping\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class MqProducerController
 * @Controller()
 */
class MqProducerController extends BaseController {

    /**
     * 扇出交换机
     */
    public function fanout($data)
    {
        $param = config('AMQP');
        $amqpDetail = config('goods_queue');  // goods_queue 或者 order_queue
        $connection = new AMQPStreamConnection(
            $param['host'], //主机名
            $param['port'], //端口号
            $param['login'], //rabbitmq用户名
            $param['password'], //rabbitmq密码
            $param['vhost']   //虚拟主机【起到消息隔离的作用】，在此不详解
        );

        //将要发送数据变为json字符串
        $messageBody = json_encode($data);

        //连接信道
        $channel = $connection->channel();

        /*
         * 流量控制 Specifies QoS
         *      消费者在开启acknowledge的情况下，对接收到的消息需要异步对消息进行确认
         *      由于消费者自身处理能力有限，从rabbitmq获取一定数量的消息后，希望rabbitmq不再将队列中的消息推送过来，
         *      当对消息处理完后（即对消息进行了ack，并且有能力处理更多的消息）再接收来自队列的消息
         * @param int $prefetch_size   最大unacked消息的字节数
         * @param int $prefetch_count  最大unacked消息的条数
         * @param bool $a_global       上述限制的限定对象，false限制单个消费者，true限制整个通道
         * @return mixed
         */
        $channel->basic_qos(0, 1, false);

        /*
         * 创建交换机(Exchange)
         * name: vckai_exchange// 交换机名称
         * type: direct        // 交换机类型，分别为direct/fanout/topic，参考另外文章的Exchange Type说明。
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: false      // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         */
        $channel->exchange_declare($amqpDetail['exchange_name'], $amqpDetail['exchange_type'], false, true, false);

        /*
         * 创建AMQP消息类型
         * $messageBody:消息体
         * delivery_mode 消息是否持久化
         *      AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1; 不持久化
         *      AMQPMessage::DELIVERY_MODE_PERSISTENT = 2; 持久化
         */
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        /*
         * 发送消息
         * msg       // AMQP消息内容
         * exchange  // 交换机名称
         */
        $channel->basic_publish($message, $amqpDetail['exchange_name']);
        $channel->close();
        $connection->close();
        return "fanout";
    }

    /**
     * 主题交换机
     */
    public function topic($data)
    {
        $param      = config('AMQP');
        $amqpDetail = config('topic_queue');
        $connection = new AMQPStreamConnection(
            $param['host'], //主机名
            $param['port'], //端口号
            $param['login'], //rabbitmq用户名
            $param['password'], //rabbitmq密码
            $param['vhost']   //虚拟主机【起到消息隔离的作用】，在此不详解
        );

        $messageBody = json_encode($data);

        $channel = $connection->channel();

        $channel->basic_qos(0, 1, false);

        //主题，无需创建交换机
        //$channel->exchange_declare($amqpDetail['exchange_name'], $amqpDetail['exchange_type'], false, true, false);
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        //可设置循环次数
        $channel->basic_publish($message, $amqpDetail['exchange_name'], $amqpDetail['route_key']);
        $channel->close();
        $connection->close();
        return  "topic";
    }

}